package com.example.springbootdockertest.controller.kafka;

import cn.hutool.core.date.TimeInterval;
import cn.hutool.core.util.StrUtil;
import com.example.springbootdockertest.listener.kafka.KafkaSendResultHandler;
import com.googlecode.aviator.AviatorEvaluator;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.util.Properties;
import java.util.UUID;
import java.util.stream.LongStream;

/**
 * kafka生产者测试
 *
 * @Author liguangcheng
 * @Date 2021/10/5 9:08 上午
 * @Vision 1.0
 **/
@RestController
@Slf4j
@EnableAsync
public class KafkaProducerTest {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaConsumerBootstrapServers;
    @Autowired
    private KafkaSendResultHandler kafkaSendResultHandler;

    @Autowired
    ApplicationContext applicationContext;

    //发送消息
    @GetMapping("send/{message}")
    public String send(@PathVariable("message") String str) {
        kafkaTemplate.setProducerListener(kafkaSendResultHandler);
        ProducerRecord<String, String> resultRecord = new ProducerRecord<>("sendTest", UUID.randomUUID().toString(), str);
        kafkaTemplate.send(resultRecord).addCallback(result -> log.info("callback发送消息成功-->{}", result.toString()), ex -> System.out.println("callback发送消息失败"));

        return "发送消息结束";
    }

    //发送消息
    @GetMapping("send1/{message}")
    public String send1(@PathVariable("message") String str) {
        ProducerRecord<String, String> resultRecord = new ProducerRecord<>("sendTest", UUID.randomUUID().toString(), str);
        try {
            kafkaTemplate.send(resultRecord);
            System.out.println("发送消息成功");
            return "发送消息成功";
        } catch (Exception e) {
            System.out.println(StrUtil.format("发送消息失败-->{}", e.getMessage()));
            return "发送消息失败";
        }

    }

    //发送消息
    @GetMapping("send2/{message}")
    public String send2(@PathVariable("message") String str) {
        try {
            kafkaTemplate.send("sendTest", UUID.randomUUID().toString(), str);
            System.out.println("发送消息成功");
            return "发送消息成功";
        } catch (Exception e) {
            System.out.println(StrUtil.format("发送消息失败-->{}", e.getMessage()));
            return "发送消息失败";
        }
    }

    //发送消息
    @GetMapping("send3/{message}")
    public String send3(@PathVariable("message") String str) {
        ProducerRecord<String, String> resultRecord = new ProducerRecord<>("sendTest", UUID.randomUUID().toString(), str);
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", kafkaConsumerBootstrapServers);
        props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("acks", "1");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        //发送
        producer.send(resultRecord);
        producer.flush();
        return "发送消息结束";
    }


    public static void main(String[] args) {
        String str = "1+2";
        System.out.println(AviatorEvaluator.execute(str));
        String s = "true";
        Boolean x = Boolean.valueOf("true");
        Boolean y = Boolean.valueOf("true");
        System.out.println(x);
        System.out.println(y);
        System.out.println(x == y);
        Boolean a = new Boolean(s);
        Boolean b = new Boolean(s);
        System.out.println(a);
        System.out.println(b);
        System.out.println(a == b);
        Boolean c = new Boolean("s");
        Boolean d = new Boolean("s");
        System.out.println(c);
        System.out.println(d);
        System.out.println(c == d);

        Long sum = 0L;
        long start1 = System.currentTimeMillis();
        //for (Long i = 1L; i <= 10_0000_0000; i++) {
        //    sum += i;
        //}
        long end1 = System.currentTimeMillis();
        System.out.println("sum=" + sum + " 时间：" + (end1 - start1));


        TimeInterval timeInterval = new TimeInterval();
        timeInterval.start();
        long start = System.currentTimeMillis();
        long reduce = LongStream.rangeClosed(0L, 10_0000_0000).parallel().reduce(0, Long::sum);
        long reduce1 = LongStream.rangeClosed(0L, 10_0000_0000).parallel().reduce(0, Long::sum);
        long end = System.currentTimeMillis();
        System.out.println("sum=" + reduce + "时间：" + (end - start));
        System.out.println("sum=" + reduce1 + "时间：" + (end - start));
        System.out.println(timeInterval.interval());

        int l1 = 10_0000_0000;
        int l2 = 1000000000;
        System.out.println(l1 == l2);


    }
}
